package com.sohu3;

import java.io.File;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import kafka.javaapi.producer.Producer;
import kafka.producer.ProducerConfig;

import org.apache.commons.io.filefilter.FileFilterUtils;
import org.apache.commons.io.monitor.FileAlterationMonitor;
import org.apache.commons.io.monitor.FileAlterationObserver;

import com.util.PropertiesUtil;

public class KafkaProducer {
	private final Producer<String, String> producer;
	public final static String TOPIC = PropertiesUtil.getKeyString("config.properties", "topic");

	private KafkaProducer() {
		Properties props = new Properties();
		props.put("metadata.broker.list", PropertiesUtil.getKeyString("config.properties", "metadata.broker.list"));
		props.put("serializer.class", "kafka.serializer.StringEncoder");
		props.put("key.serializer.class", "kafka.serializer.StringEncoder");
		props.put("request.required.acks", "-1");

		producer = new Producer<String, String>(new ProducerConfig(props));
	}

	/**
	 * 消息生产者
	 * @throws Exception 
	 */
	private void produce() throws Exception {

		File keywordDir = new File(PropertiesUtil.getKeyString("config.properties", "keyword_backup_dir"));
        // 轮询间隔 5 秒
        long interval = TimeUnit.SECONDS.toMillis(5);
        // 创建一个文件观察器用于处理文件的格式
        FileAlterationObserver observer = new FileAlterationObserver(keywordDir, FileFilterUtils.and(
                FileFilterUtils.fileFileFilter(),FileFilterUtils.suffixFileFilter(".imported")));
        //设置文件变化监听器
        observer.addListener(new KeywordFileAdapter(TOPIC, producer));
        FileAlterationMonitor monitor = new FileAlterationMonitor(interval, observer);
        monitor.start();
	}

	public static void main(String[] args) throws Exception {
		new KafkaProducer().produce();
	}
}